package com.smartlazy.delay.queue.core.service;

import com.smartlazy.delay.queue.core.bean.CircleQueue;
import com.smartlazy.delay.queue.core.bean.JobItem;
import com.smartlazy.delay.queue.core.config.DelayQueueProperties;
import com.smartlazy.delay.queue.core.constant.CommonConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.io.Serializable;
import java.util.Calendar;
import java.util.List;

/**
 * @author howe
 */
@Slf4j
@Service
public class DelayQueue implements Serializable {

    @Autowired
    private DelayBucket delayBucket;

    @Autowired
    private DelayJob delayJob;

    @Autowired
    private ReadyQueue readyQueue;

    @Autowired
    private DelayQueueProperties delayQueueProperties;

    private CircleQueue<String> circleQueue;

    /**
     * 添加一个Job到队列中
     * @param job
     */
    public Boolean push(JobItem job) throws Exception {
        long now = Calendar.getInstance().getTimeInMillis();
        long timestamp = now + (job.getDelay() * 1000);
        job.setTimestamp(timestamp);
        delayJob.putJob(job.getId(), job);

        String bucketKey = generateBucketName();
        Boolean retBucket = delayBucket.pushToBucket(bucketKey, timestamp, job.getId());
        if (!retBucket) {
            throw new Exception("添加job到bucket失败 Data:" + job.toString());
        }
        return retBucket;
    }

    /**
     * 轮询获取Job
     * @param topics
     * @return
     */
    public JobItem pop(String topics) throws Exception {
        String jobId = readyQueue.blockPopFromReadyQueue(topics, delayQueueProperties.getBlockTimeout());
        // 队列为空
        if (StringUtils.isEmpty(jobId)) {
            log.warn("轮询获取Job 队列为空");
            return null;
        }
        // 获取job元信息
        JobItem ret = delayJob.getJob(jobId);
        // 消息不存在, 可能已被删除
        if (ret == null) {
            log.warn("消息不存在, 可能已被删除.jobId:" + jobId);
            return null;
        }
        String bucketKey = generateBucketName();
        long now = Calendar.getInstance().getTimeInMillis();
        long timestamp = now + (ret.getTtr() * 1000);
        Boolean retBucket = delayBucket.pushToBucket(bucketKey, timestamp, jobId);
        if (!retBucket) {
            throw new Exception("添加job到bucket失败 Data:" + ret.toString());
        }
        log.warn("到时消息获取成功.jobId:" + jobId);
        return ret;
    }

    /**
     * 删除Job
     * @param jobId
     */
    public Boolean remove(String jobId) {
        return delayJob.removeJob(jobId);
    }

    /**
     * 查询Job
     * @param jobId
     * @return
     */
    public JobItem get(String jobId) {
        return delayJob.getJob(jobId);
    }

    public void initBucketName(CircleQueue<String> circleQueue) {
        this.circleQueue = circleQueue;
        int bucketSize = delayQueueProperties.getBucketSize();
        for (int i = 0; i < bucketSize; i++) {
            circleQueue.add(CommonConst.DEFAULT_BUCKET_KEY + i);
        }
    }

    public synchronized String generateBucketName() throws Exception {
        final Object[] queueArray = circleQueue.getQueue();
        if (queueArray != null && queueArray.length > 0) {
            String bucketName = queueArray[0].toString();
            circleQueue.add(bucketName);
            return bucketName;
        } else {
            throw new Exception("取得BucketName失败");
        }
    }
}
